use crate::server::json;
use futures::FutureExt;
use hyper::{
    body::{Buf, Bytes},
    header, Body, StatusCode,
};
use linkerd_app_core::{
    trace::{self},
    Error,
};
use trace::EnvFilter;
use tracing::instrument::WithSubscriber;

macro_rules! recover {
    ($thing:expr, $msg:literal, $status:expr $(,)?) => {
        match $thing {
            Ok(val) => val,
            Err(error) => {
                tracing::warn!(%error, status = %$status, message = %$msg);
                return Ok(json::json_error_rsp(error, $status));
            }
        }
    }
}

// If log streaming support is enabled, start a new log stream
pub async fn serve<B>(
    handle: trace::Handle,
    req: http::Request<B>,
) -> Result<http::Response<Body>, Error>
where
    B: hyper::body::HttpBody,
    B::Error: Into<Error>,
{
    let handle = handle.into_stream();

    if let Err(not_acceptable) = json::accepts_json(&req) {
        return Ok(not_acceptable);
    }

    let try_filter = match req.method() {
        // If the request is a GET, use the query string as the requested log filter.
        &http::Method::GET => {
            let query = req
                .uri()
                .query()
                .ok_or("Missing query string for log-streaming filter");
            tracing::trace!(req.query = ?query);
            let query = recover!(query, "Missing query string", StatusCode::BAD_REQUEST);
            parse_filter(query)
        }
        // If the request is a QUERY, use the request body
        method if method.as_str() == "QUERY" => {
            // TODO(eliza): validate that the request has a content-length...
            let body = recover!(
                hyper::body::aggregate(req.into_body())
                    .await
                    .map_err(Into::into),
                "Reading log stream request body",
                StatusCode::BAD_REQUEST
            );

            let body_str = recover!(
                std::str::from_utf8(body.chunk()),
                "Parsing log stream filter",
                StatusCode::BAD_REQUEST,
            );

            parse_filter(body_str)
        }
        method => {
            tracing::warn!(?method, "Unsupported method");
            return Ok(http::Response::builder()
                .status(StatusCode::METHOD_NOT_ALLOWED)
                .header(header::ALLOW, "GET")
                .header(header::ALLOW, "QUERY")
                .body(Body::empty())
                .expect("builder with known status code must not fail"));
        }
    };

    let filter = recover!(
        try_filter,
        "Parsing log stream filter",
        StatusCode::BAD_REQUEST,
    );

    let rx = recover!(
        handle.add_stream(filter),
        "Starting log stream",
        StatusCode::INTERNAL_SERVER_ERROR
    );

    // TODO(eliza): it's currently a bit sad that we have to use `Body::channel`
    // and spawn a worker task to poll from the log stream and write to the
    // request body using `Bytes::copy_from_slice` --- this allocates and
    // `memcpy`s the buffer, which is unfortunate.
    //
    // https://github.com/hawkw/thingbuf/issues/62 would allow us to avoid the
    // copy by passing the channel's pooled buffer directly to hyper, and
    // returning it to the channel to be reused when hyper is done with it.
    let (mut tx, body) = Body::channel();
    tokio::spawn(
        async move {
            // TODO(eliza): we could definitely implement some batching here.
            while let Some(line) = rx.next_line().await {
                tx.send_data(Bytes::copy_from_slice(line.as_ref())).await?;

                // if any log events were dropped, report that to the client
                let dropped = rx.take_dropped_count();
                if dropped > 0 {
                    let json =
                        serde_json::to_vec(&serde_json::json!({ "dropped_events": dropped }))?;
                    tx.send_data(Bytes::from(json)).await?;
                }
            }

            Ok(())
        }
        // send any logs generated by the log streaming task to /dev/null
        .with_subscriber(tracing::subscriber::NoSubscriber::default())
        .map(|res: Result<(), Error>| {
            tracing::debug!(?res, "Log stream completed");
        }),
    );

    Ok(mk_rsp(StatusCode::OK, body))
}

fn parse_filter(filter_str: &str) -> Result<EnvFilter, impl std::error::Error> {
    let filter = EnvFilter::builder().with_regex(false).parse(filter_str);
    tracing::trace!(?filter, ?filter_str);
    filter
}

fn mk_rsp(status: StatusCode, body: impl Into<Body>) -> http::Response<Body> {
    http::Response::builder()
        .status(status)
        .header(header::CONTENT_TYPE, json::JSON_HEADER_VAL.clone())
        .body(body.into())
        .expect("builder with known status code must not fail")
}
